{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<Center><h3> Part 2 : Parallelized Stochastic Gradient Descent"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Import Section "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sys\n",
    "import math\n",
    "from time import time\n",
    "import random\n",
    "import csv\n",
    "import numpy\n",
    "from pyspark import SparkContext\n",
    "from scipy import sparse\n",
    "import numpy as np\n",
    "from functions import *\n",
    "%matplotlib inline"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Initialize the used parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [],
   "source": [
    "rho = 0.2\n",
    "C = 6 # Number of factors\n",
    "nbr_iter = 50 # number of iterations\n",
    "block_number = 5 # number of blocks to take from the matrix"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "** Non-parallelized SGD Algorithm **\n",
    "\n",
    "- Pick $j$ uniformly at random in $\\{1, \\ldots, n\\}$\n",
    "- $\\forall t = 1 \\ldots T :$\n",
    "\n",
    "$$\n",
    "w_{t+1} \\gets w_t - \\eta \\nabla f_j(w_t)\n",
    "$$\n",
    "\n",
    "where $\\eta$ is the step size .\n",
    "To improve the performance of the algorithm we take $\\eta$ = $\\frac{\\eta_0}{\\sqrt{t+1}}$"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "def SGD(R, Q, P, mask, Ni, Nj, blockRange):\n",
    "    \"\"\"\n",
    "    This function is an implementation of the SGD algorithm described above.\n",
    "    Input : R, Q, P, mask, Ni, Nj, blockRange\n",
    "    Output : Q, P, n, blockRange\n",
    "    \"\"\"\n",
    "    \n",
    "    global rho\n",
    "    eta = .01 # first step size\n",
    "    R_new = R.nonzero()\n",
    "    n = R_new[0].size\n",
    "    \n",
    "    for i in range(n):\n",
    "        \n",
    "        j = random.randint(0, n-1) # Pick randomly an element j\n",
    "        row, col = R_new[0][j], R_new[1][j] # retrieve the row and column of the random j\n",
    "        \n",
    "        # take a small blocks from R, mask, Q and P\n",
    "        Ri = R[row,col] \n",
    "        maski = mask[row,col]\n",
    "        Qi = Q[row,:]\n",
    "        Pi = P[:,col]\n",
    "        \n",
    "        # compute the gradient of Qi and Pi\n",
    "        _, grad_Q = objective_Q(Pi, Qi, Ri, maski, rho/Ni[row])\n",
    "        _, grad_P = objective_P(Pi, Qi, Ri, maski, rho/Nj[col])\n",
    "        eta = eta * (1 + i) ** (- 0.5)\n",
    "        \n",
    "        # update the blocks of P and Q\n",
    "        Q[row,:] = Qi - eta * grad_Q\n",
    "        P[:,col] = Pi - eta * grad_P\n",
    "        #print(np.linalg.norm(Q[row,:]))\n",
    "        \n",
    "    return (Q, P, n, blockRange)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Parallelized SGD Algorithm"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "def Parallelized_SGD(R, mask):\n",
    "    \"\"\"\n",
    "    This function performs the Parallelized SGD algorithm\n",
    "    Input : R, mask\n",
    "    Output : Q, P\n",
    "    \"\"\"\n",
    "    \n",
    "    global nbr_iter, block_number, C\n",
    "    \n",
    "    Q = numpy.random.random_sample((R.shape[0], C))\n",
    "    P = numpy.random.random_sample((C, R.shape[1]))\n",
    "    block_i = (R.shape[0]/block_number, R.shape[1]/block_number)\n",
    "    \n",
    "    \n",
    "    rowRangeList = [[k*block_i[0],(k+1)*block_i[0]] for k in range(block_number)]\n",
    "    colRangeList = [[k*block_i[1],(k+1)*block_i[1]] for k in range(block_number)]\n",
    "\n",
    "    rowRangeList[-1][1] += R.shape[0]%block_number\n",
    "    colRangeList[-1][1] += R.shape[1]%block_number\n",
    "\n",
    "\n",
    "    for iter_ in range(nbr_iter):\n",
    "        if iter_ % 10 == 0:\n",
    "            print(\"... iteration %s\"%(iter_))\n",
    "        \n",
    "        for epoch in range(block_number):\n",
    "            grid = []\n",
    "            \n",
    "            for block in range(block_number):\n",
    "                rowRange = [int(rowRangeList[block][0]), int(rowRangeList[block][1])]\n",
    "                colRange = [int(colRangeList[block][0]), int(colRangeList[block][1])]\n",
    "                \n",
    "                Rn = R[rowRange[0]:rowRange[1], colRange[0]:colRange[1]]\n",
    "                maskn = mask[rowRange[0]:rowRange[1], colRange[0]:colRange[1]]\n",
    "                Qn = Q[rowRange[0]:rowRange[1],:]\n",
    "                Pn = P[:,colRange[0]:colRange[1]]\n",
    "                \n",
    "                Ni = {}\n",
    "                for i in range(rowRange[0],rowRange[1]):\n",
    "                    Ni[int(i-int(rowRange[0]))] = R[i,:].nonzero()[0].size\n",
    "                    \n",
    "                Nj = {}\n",
    "                for i in range(colRange[0],colRange[1]):\n",
    "                    Nj[i-colRange[0]] = R[:,i].nonzero()[0].size \n",
    "                    \n",
    "                if (Rn.nonzero()[0].size != 0):\n",
    "                    grid.append([Rn, Qn, Pn, maskn, Ni, Nj, (rowRange, colRange)])\n",
    "                    \n",
    "                    \n",
    "                    \n",
    "            rdd = sc.parallelize(grid, block_number).\\\n",
    "                        map(lambda x: SGD(x[0],x[1],x[2],x[3],x[4],x[5],x[6])).collect()\n",
    "                \n",
    "                \n",
    "            for elem in rdd:\n",
    "                rowRange,colRange = elem[3]\n",
    "                Q[rowRange[0]:rowRange[1],:] = elem[0]\n",
    "                P[:,colRange[0]:colRange[1]] = elem[1]\n",
    "\n",
    "            colRangeList.insert(0,colRangeList.pop())\n",
    "            \n",
    "            \n",
    "            \n",
    "    return Q,P"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [],
   "source": [
    "def outputMatrix(A, path):\n",
    "    \"\"\"\n",
    "    This function outputs a matrix to a csv file\n",
    "    \"\"\"\n",
    "    f = open(path, 'w', 100)\n",
    "    rows= A.shape[0]\n",
    "    cols = A.shape[1]\n",
    "    for row in range(rows):\n",
    "        for col in range(cols):  \n",
    "            if col == cols-1:\n",
    "                f.write(str(A[row,col])) \n",
    "            else:\n",
    "                f.write(str(A[row,col]) + \",\")\n",
    "        f.write(\"\\n\")\n",
    "\n",
    "    f.flush()\n",
    "    f.close()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [],
   "source": [
    "def load_data(filename=\"u.data\" , small_data=True):\n",
    "    \"\"\"\n",
    "    This function returns :\n",
    "        R : the matrix user-item containing the ratings\n",
    "        mask : matrix is equal to 1 if a score existes and 0 otherwise\n",
    "    \"\"\"\n",
    "    data = np.loadtxt(filename, dtype=int)\n",
    "    \n",
    "    R = sparse.csr_matrix((data[:, 2], (data[:, 0]-1, data[:, 1]-1)),dtype=float)\n",
    "    mask = sparse.csr_matrix((np.ones(data[:, 2].shape),(data[:, 0]-1, data[:, 1]-1)), dtype=bool )\n",
    "\n",
    "    # take a small part of the whole data for testing \n",
    "    if small_data is True:\n",
    "        R = (R[0:100, 0:100].copy())\n",
    "        mask = (mask[0:100, 0:100].copy())\n",
    "        \n",
    "        \n",
    "    return R.toarray(), mask.toarray()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Testing our algorithm"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Start Process ....\n",
      "... iteration 0\n",
      "... iteration 10\n",
      "... iteration 20\n",
      "... iteration 30\n",
      "... iteration 40\n",
      "Process finished in 1227.4811024665833 s\n"
     ]
    }
   ],
   "source": [
    "global R, P, Q\n",
    "spark = SparkContext.getOrCreate()\n",
    "output_Q = \"Q.csv\"\n",
    "output_P = \"P.csv\"\n",
    "\n",
    "#load data\n",
    "R, mask = load_data(filename=\"u.data\" , small_data=True)\n",
    "t = time()\n",
    "print(\"Start Process ....\")\n",
    "Q, P = Parallelized_SGD(R, mask)\n",
    "print(\"Process finished in %s s\"%(time()-t))\n",
    "# Wrtie the obtained Matrices to csv file\n",
    "outputMatrix(P,output_P)\n",
    "outputMatrix(Q,output_Q)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[[ 5.24681419  3.44999746  2.93827376 ...,  4.93948659  4.20981051\n",
      "   5.90208862]\n",
      " [ 3.45890691  2.88852161  1.94166001 ...,  3.45883726  2.87506586\n",
      "   3.92243295]\n",
      " [ 3.28104943  2.11192302  2.53980984 ...,  3.09046697  2.66709054\n",
      "   3.67397603]\n",
      " ..., \n",
      " [ 2.79261176  2.11598226  2.5189713  ...,  2.36535702  1.94035837\n",
      "   2.67286888]\n",
      " [ 2.86420341  1.8404591   2.10479273 ...,  2.83349656  2.49764757\n",
      "   3.71285494]\n",
      " [ 2.82295066  2.46283972  2.08517971 ...,  2.90593869  2.04198094\n",
      "   3.28897811]]\n",
      "[[ 5.  3.  4. ...,  4.  3.  5.]\n",
      " [ 4.  0.  0. ...,  0.  0.  5.]\n",
      " [ 0.  0.  0. ...,  0.  0.  0.]\n",
      " ..., \n",
      " [ 0.  0.  0. ...,  0.  0.  0.]\n",
      " [ 4.  0.  3. ...,  5.  0.  5.]\n",
      " [ 0.  0.  0. ...,  0.  0.  0.]]\n"
     ]
    }
   ],
   "source": [
    "R_result = Q.dot(P)\n",
    "print(R_result)\n",
    "print(R)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can see that the algorithm works relatively well since non zero values in R are very similar to those in QP at the same positions.  "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Relative error : 26.429458\n"
     ]
    }
   ],
   "source": [
    "relative_error = np.linalg.norm(mask*(R - np.dot(Q, P))) / np.linalg.norm(R) * 100\n",
    "print(\"Relative error : %f\"%relative_error)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Recommendation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In order to find the movie that we'll recommand to the user n° u, we have to consider the decomposition $ R = QP$.\n",
    "\n",
    "In fact, by multiplying the $u^{th}$ row of Q by the matrix P, we obtain a vector r of size $ I$ x $ 1$ that contains all the estimated ratings of movies for this user u.   \n",
    "\n",
    "Now, we only need to consider the highest score and take its index (which correspond to the recommanded movie), however we shoudn't forget that we have to avoid recommanding a movie that the user had already seen, that's why we need to multiply our vector r by the opposite of the mask. \n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "** What recommanding to the user n° 45 ?** "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "We recommand to the user n° 45 the movie n°  55\n",
      "The rating of this movie is : 3.447 \n"
     ]
    }
   ],
   "source": [
    "u = 45\n",
    "r = np.dot(Q[u,:], P)\n",
    "\n",
    "r = r * (1 - mask[u,:])\n",
    "\n",
    "movie_index = np.argmax(r)\n",
    "print(\"We recommand to the user n° %s the movie n°  %d\"%(u,movie_index))\n",
    "print (\"The rating of this movie is : %0.3f \"%r[movie_index])"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [default]",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
